11#include <zypp-core/base/DtorReset>
15#include <zypp-core/zyppng/pipelines/AsyncResult>
20#include <zypp-core/zyppng/base/AutoDisconnect>
21#include <zypp-core/zyppng/base/EventDispatcher>
22#include <zypp-media/MediaConfig>
27#undef ZYPP_BASE_LOGGER_LOGGROUP
28#define ZYPP_BASE_LOGGER_LOGGROUP "ProvideWorker"
80 if ( !
_controlIO->openFds( { recv }, send ) ) {
122 return std::make_shared<ProvideWorkerItem>( std::move(spec) );
132 ERR <<
"Failed to send ProvideStart message" << std::endl;
138 MIL_PRV <<
"Sending provideSuccess for id " <<
id <<
" file " <<
localFile << std::endl;
140 for (
auto i =
extra.beginList ();
i !=
extra.endList();
i++ ) {
141 for (
const auto &val :
i->second )
142 msg.addValue(
i->first, val );
144 if ( !
_stream->sendMessage( msg ) ) {
145 ERR <<
"Failed to send ProvideSuccess message" << std::endl;
151 MIL_PRV <<
"Sending provideFailed for request " <<
id <<
" err: " <<
reason << std::endl;
153 for (
auto i =
extra.beginList ();
i !=
extra.endList();
i++ ) {
154 for (
const auto &val :
i->second )
155 msg.addValue(
i->first, val );
157 if ( !
_stream->sendMessage( msg ) ) {
158 ERR <<
"Failed to send ProvideFailed message" << std::endl;
166 if ( !
e.historyEmpty() ) {
179 MIL_PRV <<
"Sending attachSuccess for request " <<
id << std::endl;
181 ERR <<
"Failed to send AttachFinished message" << std::endl;
183 MIL <<
"Sent back attach success" << std::endl;
189 MIL_PRV <<
"Sending detachSuccess for request " <<
id << std::endl;
191 ERR <<
"Failed to send DetachFinished message" << std::endl;
209 const auto &msg =
_stream->nextMessageWait() | [&](
auto &&nextMessage ) {
210 if ( !nextMessage ) {
222 ERR <<
"Failed to receive message" << std::endl;
231 MIL <<
"Remembering message for later: " << msg->code () << std::endl;
243 MIL <<
"Failed to wait for message, aborting the request " << std::endl;
247 if (
m->code() == ProvideMessage::Code::MediaChanged )
249 else if (
m->code() == ProvideMessage::Code::MediaChangeSkip )
260 if (
m.code() == ProvideMessage::Code::AuthInfo ) {
263 for( const auto &hdr : m.headers () ) {
264 if ( hdr.first == AuthInfoMsgFields::Username ) {
265 inf.username = hdr.second.asString();
266 } else if ( hdr.first == AuthInfoMsgFields::Password ) {
267 inf.password = hdr.second.asString();
268 } else if ( hdr.first == AuthInfoMsgFields::AuthTimestamp ) {
269 inf.last_auth_timestamp = hdr.second.asInt64();
271 if ( !hdr.second.isString() ) {
272 ERR <<
"Ignoring invalid extra value, " << hdr.first <<
" is not of type string" << std::endl;
274 inf.extraKeys[hdr.first] = hdr.second.asString();
286 return *_controlIO.get();
291 const auto &
helo = _stream->nextMessageWait();
293 ERR <<
"Could not receive a handshake message, aborting" << std::endl;
299 invalidMessageReceived(
exp.error() );
303 return std::move(*
exp) | [&](
auto conf ) {
305 _workerConf = std::move(
conf);
308 for(
const auto &[key,value] : _workerConf ) {
310 if (
keyUrl.getScheme() ==
"zconfig" &&
keyUrl.getAuthority() ==
"main" ) {
317 caps.set_worker_name( _workerName.data() );
320 if ( !_stream->sendMessage (
caps ) ) {
328 void ProvideWorker::messageLoop(
Timer & )
333 while ( _pendingMessages.size () ) {
334 auto m = _pendingMessages.front ();
335 _pendingMessages.pop_front ();
336 handleSingleMessage(
m);
339 if ( !_fatalError && _pendingProvides.size() ) {
344 if ( !_fatalError && ( _pendingMessages.size() || ( _pendingProvides.size () && _provNotificationMode == QUEUE_NOT_EMTPY ) ) ) {
350 void ProvideWorker::maybeDelayedShutdown()
352 if ( _inControllerRequest ) {
353 _delayedShutdown->start(0);
363 MIL <<
"Read FD closed, exiting." << std::endl;
364 maybeDelayedShutdown();
369 MIL <<
"Write FD closed, exiting." << std::endl;
370 maybeDelayedShutdown();
373 void ProvideWorker::messageReceived()
375 while (
auto message = _stream->nextMessage() ) {
378 pushSingleMessage(*message);
382 void ProvideWorker::onInvalidMessageReceived()
384 invalidMessageReceived( std::exception_ptr() );
387 void ProvideWorker::invalidMessageReceived( std::exception_ptr
p )
389 ERR <<
"Received a invalid message on the input stream, aborting" << std::endl;
400 const auto code = provide.code();
402 if ( code >= ProvideMessage::Code::FirstControllerCode && code <= ProvideMessage::Code::LastControllerCode ) {
404 MIL_PRV <<
"Received request: " << code << std::endl;
406 if ( code == ProvideMessage::Code::Cancel ) {
407 const auto &
i = std::find_if( _pendingProvides.begin (), _pendingProvides.end(), [
id = provide.requestId() ](
const auto &
it ){ return it->_spec.requestId() == id; } );
408 if (
i != _pendingProvides.end() ) {
409 switch ( (*i)->_state ) {
410 case ProvideWorkerItem::Pending:
412 _pendingProvides.erase(
i);
414 case ProvideWorkerItem::Running:
417 case ProvideWorkerItem::Finished:
420 MIL <<
"Received Cancel for unknown request: " << provide.requestId() <<
", ignoring!" << std::endl;
428 ERR <<
"Unsupported request with code: " << code <<
" received!" << std::endl;
434 return parseReceivedMessage (message )
436 _pendingMessages.push_back(provide);
445 const auto &
exp = handle( message );
448 std::rethrow_exception (
exp.error () );
450 ERR <<
"Catched exception during message handling: " <<
e << std::endl;
451 }
catch (
const std::exception &
e ) {
452 ERR <<
"Catched exception during message handling: " <<
e.what()<< std::endl;
454 ERR <<
"Unknown Exception during message handling" << std::endl;
463 invalidMessageReceived(
exp.error() );
Reference counted access to a Tp object calling a custom Dispose function when the last AutoDispose h...
Assign a vaiable a certain value when going out of scope.
Base class for Exception.
Command frame for communication with PluginScript.
const std::string & command() const
Return the frame command.
static LogControl instance()
Singleton access.
std::string basename() const
Return the last component of this path.
SignalProxy< void(uint, AsyncDataSource::ChannelCloseReason)> sigReadFdClosed()
SignalProxy< void(AsyncDataSource::ChannelCloseReason)> sigWriteFdClosed()
static auto connectFunc(typename internal::MemberFunction< SenderFunc >::ClassType &s, SenderFunc &&sFun, ReceiverFunc &&rFunc, const Tracker &...trackers)
static auto connect(typename internal::MemberFunction< SenderFunc >::ClassType &s, SenderFunc &&sFun, typename internal::MemberFunction< ReceiverFunc >::ClassType &recv, ReceiverFunc &&rFunc)
static ProvideMessage createProvideStarted(const uint32_t reqId, const zypp::Url &url, const std::optional< std::string > &localFilename={}, const std::optional< std::string > &stagingFilename={})
static ProvideMessage createProvideFinished(const uint32_t reqId, const std::string &localFilename, bool cacheHit)
static ProvideMessage createMediaChangeRequest(const uint32_t reqId, const std::string &label, int32_t mediaNr, const std::vector< std::string > &devices, const std::optional< std::string > &desc)
static ProvideMessage createAuthDataRequest(const uint32_t reqId, const zypp::Url &effectiveUrl, const std::string &lastTriedUser="", const std::optional< int64_t > &lastAuthTimestamp={}, const std::map< std::string, std::string > &extraValues={})
static ProvideMessage createErrorResponse(const uint32_t reqId, const Code code, const std::string &reason, bool transient=false)
static ProvideMessage createDetachFinished(const uint32_t reqId)
static expected< ProvideMessage > create(const zypp::PluginFrame &message)
static ProvideMessage createAttachFinished(const uint32_t reqId, const std::optional< std::string > &localMountPoint={})
SignalProxy< void()> sigInvalidMessageReceived()
SignalProxy< void()> sigMessageReceived()
static Ptr create(IODevice::Ptr iostr)
The Timer class provides repetitive and single-shot timers.
SignalProxy< void(Timer &t) sigExpired)()
This signal is always emitted when the timer expires.
static expected success(ConsParams &&...params)
void detachSuccess(const uint32_t id)
std::deque< ProvideWorkerItemRef > _pendingProvides
MediaChangeRes requestMediaChange(const uint32_t id, const std::string &label, const int32_t mediaNr, const std::vector< std::string > &devices, const std::optional< std::string > &desc={})
void messageLoop(Timer &)
void attachSuccess(const uint32_t id, const std::optional< std::string > &localMountPoint={})
ProvideWorker(std::string_view workerName)
expected< void > executeHandshake()
AsyncDataSource::Ptr _controlIO
void onInvalidMessageReceived()
void provideStart(const uint32_t id, const zypp::Url &url, const zypp::Pathname &localFile, const zypp::Pathname &stagingFile={})
StompFrameStreamRef messageStream() const
std::deque< ProvideMessage > _pendingMessages
void writeFdClosed(AsyncDataSource::ChannelCloseReason)
expected< ProvideMessage > parseReceivedMessage(const zypp::PluginFrame &m)
Timer::Ptr _delayedShutdown
void maybeDelayedShutdown()
void provideSuccess(const uint32_t id, bool cacheHit, const zypp::Pathname &localFile, const HeaderValueMap extra={})
ProvideNotificatioMode provNotificationMode() const
expected< void > run(int recv=STDIN_FILENO, int send=STDOUT_FILENO)
virtual ProvideWorkerItemRef makeItem(ProvideMessage &&spec)
void setProvNotificationMode(const ProvideNotificatioMode &provNotificationMode)
void provideFailed(const uint32_t id, const ProvideMessage::Code code, const std::string &reason, const bool transient, const HeaderValueMap extra={})
void readFdClosed(uint, AsyncDataSource::ChannelCloseReason)
StompFrameStreamRef _stream
std::deque< ProvideWorkerItemRef > & requestQueue()
ProvideNotificatioMode _provNotificationMode
bool _inControllerRequest
expected< AuthInfo > requireAuthorization(const uint32_t id, const zypp::Url &url, const std::string &lastTriedUsername="", const int64_t lastTimestamp=-1, const std::map< std::string, std::string > &extraFields={})
expected< ProvideMessage > sendAndWaitForResponse(const ProvideMessage &request, const std::vector< uint > &responseCodes)
std::exception_ptr _fatalError
Easy-to use interface to the ZYPP dependency resolver.
constexpr std::string_view History("history")
auto and_then(Fun &&function)
ResultType or_else(const expected< T, E > &exp, Function &&f)
ResultType and_then(const expected< T, E > &exp, Function &&f)
Convenient building of std::string via std::ostringstream Basically a std::ostringstream autoconverti...
static ZYPP_API ThreadData & current()
#define ZYPP_EXCPT_PTR(EXCPT)
Drops a logline and returns Exception as a std::exception_ptr.